/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package org.redkale.util;

import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.*;

/**
 * 参考 https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/maps/NonBlockingHashMap.java version: v3.3.0实现的MPSC队列
 *
 * <p>
 * 详情见: https://redkale.org
 *
 * @param <TypeK> 泛型
 * @param <TypeV> 泛型
 *
 * @author zhangjx
 * @since 2.5.0
 */
public class NonBlockingHashMap<TypeK, TypeV> extends AbstractMap<TypeK, TypeV> implements ConcurrentMap<TypeK, TypeV>, Cloneable, Serializable {

    private static final long serialVersionUID = 1234123412341234123L;

    private static final int REPROBE_LIMIT = 10; // Too many reprobes then force a table-resize

    private static final Unsafe UNSAFE = Utility.unsafe();
    // --- Bits to allow Unsafe access to arrays

    private static final int _Obase = UNSAFE == null ? 0 : UNSAFE.arrayBaseOffset(Object[].class);

    private static final int _Oscale = UNSAFE == null ? 0 : UNSAFE.arrayIndexScale(Object[].class);

    private static final int _Olog = _Oscale == 4 ? 2 : (_Oscale == 8 ? 3 : 9999);

    private static long rawIndex(final Object[] ary, final int idx) {
        assert idx >= 0 && idx < ary.length;
        // Note the long-math requirement, to handle arrays of more than 2^31 bytes
        // - or 2^28 - or about 268M - 8-byte pointer elements.
        return _Obase + ((long) idx << _Olog);
    }

    // --- Setup to use Unsafe
    private static final long _kvs_offset = fieldOffset(NonBlockingHashMap.class, "_kvs");

    static long fieldOffset(Class clz, String fieldName) throws RuntimeException {
        if (UNSAFE == null) return 0L;
        try {
            return UNSAFE.objectFieldOffset(clz.getDeclaredField(fieldName));
        } catch (NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
    }

    static <E> void spRefElement(E[] buffer, long offset, E e) {
        UNSAFE.putObject(buffer, offset, e);
    }

    private boolean CAS_kvs(final Object[] oldkvs, final Object[] newkvs) {
        return UNSAFE.compareAndSwapObject(this, _kvs_offset, oldkvs, newkvs);
    }

    // --- Adding a 'prime' bit onto Values via wrapping with a junk wrapper class
    private static final class Prime {

        final Object _V;

        Prime(Object V) {
            _V = V;
        }

        static Object unbox(Object V) {
            return V instanceof Prime ? ((Prime) V)._V : V;
        }
    }

    // --- hash ----------------------------------------------------------------
    // Helper function to spread lousy hashCodes.  Throws NPE for null Key, on
    // purpose - as the first place to conveniently toss the required NPE for a
    // null Key.
    private static int hash(final Object key) {
        int h = key.hashCode();     // The real hashCode call
        h ^= (h >>> 20) ^ (h >>> 12);
        h ^= (h >>> 7) ^ (h >>> 4);
        h += h << 7; // smear low bits up high, for hashcodes that only differ by 1
        return h;
    }

    // --- The Hash Table --------------------
    // Slot 0 is always used for a 'CHM' entry below to hold the interesting
    // bits of the hash table.  Slot 1 holds full hashes as an array of ints.
    // Slots {2,3}, {4,5}, etc hold {Key,Value} pairs.  The entire hash table
    // can be atomically replaced by CASing the _kvs field.
    //
    // Why is CHM buried inside the _kvs Object array, instead of the other way
    // around?  The CHM info is used during resize events and updates, but not
    // during standard 'get' operations.  I assume 'get' is much more frequent
    // than 'put'.  'get' can skip the extra indirection of skipping through the
    // CHM to reach the _kvs array.
    private transient Object[] _kvs;

    private static final CHM chm(Object[] kvs) {
        return (CHM) kvs[0];
    }

    private static final int[] hashes(Object[] kvs) {
        return (int[]) kvs[1];
    }
    // Number of K,V pairs in the table

    private static final int len(Object[] kvs) {
        return (kvs.length - 2) >> 1;
    }

    // Time since last resize
    private transient long _last_resize_milli;

    // --- Minimum table size ----------------
    // Pick size 8 K/V pairs, which turns into (8*2+2)*4+12 = 84 bytes on a
    // standard 32-bit HotSpot, and (8*2+2)*8+12 = 156 bytes on 64-bit Azul.
    private static final int MIN_SIZE_LOG = 3;             //

    private static final int MIN_SIZE = (1 << MIN_SIZE_LOG); // Must be power of 2

    // --- Sentinels -------------------------
    // No-Match-Old - putIfMatch does updates only if it matches the old value,
    // and NO_MATCH_OLD basically counts as a wildcard match.
    private static final Object NO_MATCH_OLD = new Object(); // Sentinel
    // Match-Any-not-null - putIfMatch does updates only if it find a real old
    // value.

    private static final Object MATCH_ANY = new Object(); // Sentinel
    // This K/V pair has been deleted (but the Key slot is forever claimed).
    // The same Key can be reinserted with a new value later.

    public static final Object TOMBSTONE = new Object();
    // Prime'd or box'd version of TOMBSTONE.  This K/V pair was deleted, then a
    // table resize started.  The K/V pair has been marked so that no new
    // updates can happen to the old table (and since the K/V pair was deleted
    // nothing was copied to the new table).

    private static final Prime TOMBPRIME = new Prime(TOMBSTONE);

    // --- key,val -------------------------------------------------------------
    // Access K,V for a given idx
    //
    // Note that these are static, so that the caller is forced to read the _kvs
    // field only once, and share that read across all key/val calls - lest the
    // _kvs field move out from under us and back-to-back key & val calls refer
    // to different _kvs arrays.
    private static final Object key(Object[] kvs, int idx) {
        return kvs[(idx << 1) + 2];
    }

    private static final Object val(Object[] kvs, int idx) {
        return kvs[(idx << 1) + 3];
    }

    private static final boolean CAS_key(Object[] kvs, int idx, Object old, Object key) {
        return UNSAFE.compareAndSwapObject(kvs, rawIndex(kvs, (idx << 1) + 2), old, key);
    }

    private static final boolean CAS_val(Object[] kvs, int idx, Object old, Object val) {
        return UNSAFE.compareAndSwapObject(kvs, rawIndex(kvs, (idx << 1) + 3), old, val);
    }

    // --- dump ----------------------------------------------------------------
    // Verbose printout of table internals, useful for debugging. 
    public final void print() {
        System.out.println("=========");
        print2(_kvs);
        System.out.println("=========");
    }
    // print the entire state of the table

    private final void print(Object[] kvs) {
        for (int i = 0; i < len(kvs); i++) {
            Object K = key(kvs, i);
            if (K != null) {
                String KS = (K == TOMBSTONE) ? "XXX" : K.toString();
                Object V = val(kvs, i);
                Object U = Prime.unbox(V);
                String p = (V == U) ? "" : "prime_";
                String US = (U == TOMBSTONE) ? "tombstone" : U.toString();
                System.out.println("" + i + " (" + KS + "," + p + US + ")");
            }
        }
        Object[] newkvs = chm(kvs)._newkvs; // New table, if any
        if (newkvs != null) {
            System.out.println("----");
            print(newkvs);
        }
    }
    // print only the live values, broken down by the table they are in

    private final void print2(Object[] kvs) {
        for (int i = 0; i < len(kvs); i++) {
            Object key = key(kvs, i);
            Object val = val(kvs, i);
            Object U = Prime.unbox(val);
            if (key != null && key != TOMBSTONE
                && // key is sane
                val != null && U != TOMBSTONE) { // val is sane
                String p = (val == U) ? "" : "prime_";
                System.out.println("" + i + " (" + key + "," + p + val + ")");
            }
        }
        Object[] newkvs = chm(kvs)._newkvs; // New table, if any
        if (newkvs != null) {
            System.out.println("----");
            print2(newkvs);
        }
    }

    // Count of reprobes
    private transient ConcurrentAutoTable _reprobes = new ConcurrentAutoTable();

    /** Get and clear the current count of reprobes. Reprobes happen on key
     * collisions, and a high reprobe rate may indicate a poor hash function or
     * weaknesses in the table resizing function.
     *
     * @return the count of reprobes since the last call to {@link #reprobes}
     *         or since the table was created. */
    public long reprobes() {
        long r = _reprobes.get();
        _reprobes = new ConcurrentAutoTable();
        return r;
    }

    // --- reprobe_limit -----------------------------------------------------
    // Heuristic to decide if we have reprobed toooo many times.  Running over
    // the reprobe limit on a 'get' call acts as a 'miss'; on a 'put' call it
    // can trigger a table resize.  Several places must have exact agreement on
    // what the reprobe_limit is, so we share it here.
    private static int reprobe_limit(int len) {
        return REPROBE_LIMIT + (len >> 4);
    }

    // --- NonBlockingHashMap --------------------------------------------------
    // Constructors
    /** Create a new NonBlockingHashMap with default minimum size (currently set
     * to 8 K/V pairs or roughly 84 bytes on a standard 32-bit JVM). */
    public NonBlockingHashMap() {
        this(MIN_SIZE);
    }

    /** *  Create a new NonBlockingHashMap with initial room for the given number of
     * elements, thus avoiding internal resizing operations to reach an
     * appropriate size.Large numbers here when used with a small count of
     * elements will sacrifice space for a small amount of time gained. The
     * initial size will be rounded up internally to the next larger power of 2.
     * @param initial_sz int
     */
    public NonBlockingHashMap(final int initial_sz) {
        initialize(initial_sz);
    }

    private final void initialize(int initial_sz) {
        if (initial_sz < 0) {
            throw new IllegalArgumentException("initial_sz: " + initial_sz + " (expected: >= 0)");
        }
        int i;                      // Convert to next largest power-of-2
        if (initial_sz > 1024 * 1024) initial_sz = 1024 * 1024;
        for (i = MIN_SIZE_LOG; (1 << i) < (initial_sz << 2); i++) ;
        // Double size for K,V pairs, add 1 for CHM and 1 for hashes
        _kvs = new Object[((1 << i) << 1) + 2];
        _kvs[0] = new CHM(new ConcurrentAutoTable()); // CHM in slot 0
        _kvs[1] = new int[1 << i];          // Matching hash entries
        _last_resize_milli = System.currentTimeMillis();
    }
    // Version for subclassed readObject calls, to be called after the defaultReadObject

    protected final void initialize() {
        initialize(MIN_SIZE);
    }

    // --- wrappers ------------------------------------------------------------
    /** Returns the number of key-value mappings in this map.
     *
     * @return the number of key-value mappings in this map */
    @Override
    public int size() {
        return chm(_kvs).size();
    }

    /** Returns &#60;tt&#62;size() == 0&#60;/tt&#62;.
     *
     * @return &#60;tt&#62;size() == 0&#60;/tt&#62; */
    @Override
    public boolean isEmpty() {
        return size() == 0;
    }

    /** Tests if the key in the table using the &#60;tt&#62;equals&#60;/tt&#62; method.
     *
     * @return &#60;tt&#62;true&#60;/tt&#62; if the key is in the table using the &#60;tt&#62;equals&#60;/tt&#62; method
     * @throws NullPointerException if the specified key is null */
    @Override
    public boolean containsKey(Object key) {
        return get(key) != null;
    }

    /** Legacy method testing if some key maps into the specified value in this
     * table. This method is identical in functionality to {@link
     *  #containsValue}, and exists solely to ensure full compatibility with
     * class {@link java.util.Hashtable}, which supported this method prior to
     * introduction of the Java Collections framework.
     *
     * @param val a value to search for
     *
     * @return &#60;tt&#62;true&#60;/tt&#62; if this map maps one or more keys to the specified value
     * @throws NullPointerException if the specified value is null */
    public boolean contains(Object val) {
        return containsValue(val);
    }

    /** Maps the specified key to the specified value in the table. Neither key
     * nor value can be null.
     * <p>
     * The value can be retrieved by calling {@link #get} with a key that is
     * equal to the original key.
     *
     * @param key key with which the specified value is to be associated
     * @param val value to be associated with the specified key
     *
     * @return the previous value associated with &#60;tt&#62;key&#60;/tt&#62;, or
     * &#60;tt&#62;null&#60;/tt&#62; if there was no mapping for &#60;tt&#62;key&#60;/tt&#62;
     * @throws NullPointerException if the specified key or value is null */
    @Override
    public TypeV put(TypeK key, TypeV val) {
        return putIfMatch(key, val, NO_MATCH_OLD);
    }

    /** Atomically, do a {@link #put} if-and-only-if the key is not mapped.
     * Useful to ensure that only a single mapping for the key exists, even if
     * many threads are trying to create the mapping in parallel.
     *
     * @return the previous value associated with the specified key,
     *         or &#60;tt&#62;null&#60;/tt&#62; if there was no mapping for the key
     * @throws NullPointerException if the specified key or value is null */
    @Override
    public TypeV putIfAbsent(TypeK key, TypeV val) {
        return putIfMatch(key, val, TOMBSTONE);
    }

    /** Removes the key (and its corresponding value) from this map.
     * This method does nothing if the key is not in the map.
     *
     * @return the previous value associated with &#60;tt&#62;key&#60;/tt&#62;, or
     * &#60;tt&#62;null&#60;/tt&#62; if there was no mapping for &#60;tt&#62;key&#60;/tt&#62;
     * @throws NullPointerException if the specified key is null */
    @Override
    public TypeV remove(Object key) {
        return putIfMatch(key, TOMBSTONE, NO_MATCH_OLD);
    }

    /** Atomically do a {@link #remove(Object)} if-and-only-if the key is mapped
     * to a value which is <code>equals</code> to the given value.
     *
     * @throws NullPointerException if the specified key or value is null */
    public boolean remove(Object key, Object val) {
        return objectsEquals(putIfMatch(key, TOMBSTONE, val), val);
    }

    /** Atomically do a <code>put(key,val)</code> if-and-only-if the key is
     * mapped to some value already.
     *
     * @throws NullPointerException if the specified key or value is null */
    @Override
    public TypeV replace(TypeK key, TypeV val) {
        return putIfMatch(key, val, MATCH_ANY);
    }

    /** Atomically do a <code>put(key,newValue)</code> if-and-only-if the key is
     * mapped a value which is <code>equals</code> to <code>oldValue</code>.
     *
     * @throws NullPointerException if the specified key or value is null */
    @Override
    public boolean replace(TypeK key, TypeV oldValue, TypeV newValue) {
        return objectsEquals(putIfMatch(key, newValue, oldValue), oldValue);
    }

    private static boolean objectsEquals(Object a, Object b) {
        return (a == b) || (a != null && a.equals(b));
    }

    // Atomically replace newVal for oldVal, returning the value that existed
    // there before.  If the oldVal matches the returned value, then newVal was
    // inserted, otherwise not.  A null oldVal means the key does not exist (only
    // insert if missing); a null newVal means to remove the key.
    public final TypeV putIfMatchAllowNull(Object key, Object newVal, Object oldVal) {
        if (oldVal == null) oldVal = TOMBSTONE;
        if (newVal == null) newVal = TOMBSTONE;
        final TypeV res = (TypeV) putIfMatch0(this, _kvs, key, newVal, oldVal);
        assert !(res instanceof Prime);
        //assert res != null;
        return res == TOMBSTONE ? null : res;
    }

    /** *  Atomically replace newVal for oldVal, returning the value that existed
     * there before.If the oldVal matches the returned value, then newVal was
     * inserted, otherwise not.
     *
     * @param key key
     * @param newVal newVal
     * @param oldVal oldVal
     * @return the previous value associated with the specified key,
     *         or &#60;tt&#62;null&#60;/tt&#62; if there was no mapping for the key
     * @throws NullPointerException if the key or either value is null
     */
    public final TypeV putIfMatch(Object key, Object newVal, Object oldVal) {
        if (oldVal == null || newVal == null) throw new NullPointerException();
        final Object res = putIfMatch0(this, _kvs, key, newVal, oldVal);
        assert !(res instanceof Prime);
        assert res != null;
        return res == TOMBSTONE ? null : (TypeV) res;
    }

    /** Copies all of the mappings from the specified map to this one, replacing
     * any existing mappings.
     *
     * @param m mappings to be stored in this map */
    @Override
    public void putAll(Map<? extends TypeK, ? extends TypeV> m) {
        for (Map.Entry<? extends TypeK, ? extends TypeV> e : m.entrySet())
            put(e.getKey(), e.getValue());
    }

    /** Removes all of the mappings from this map. */
    @Override
    public void clear() {         // Smack a new empty table down
        Object[] newkvs = new NonBlockingHashMap(MIN_SIZE)._kvs;
        while (!CAS_kvs(_kvs, newkvs)) // Spin until the clear works
      ;
    }

    /** Returns &#60;tt&#62;true&#60;/tt&#62; if this Map maps one or more keys to the specified
     * value.  <em>Note</em>: This method requires a full internal traversal of the
     * hash table and is much slower than {@link #containsKey}.
     *
     * @param val value whose presence in this map is to be tested
     *
     * @return &#60;tt&#62;true&#60;/tt&#62; if this map maps one or more keys to the specified value
     * @throws NullPointerException if the specified value is null */
    @Override
    public boolean containsValue(final Object val) {
        if (val == null) throw new NullPointerException();
        for (TypeV V : values())
            if (V == val || V.equals(val))
                return true;
        return false;
    }

    // This function is supposed to do something for Hashtable, and the JCK
    // tests hang until it gets called... by somebody ... for some reason,
    // any reason....
    protected void rehash() {
    }

    /**
     * Creates a shallow copy of this hashtable. All the structure of the
     * hashtable itself is copied, but the keys and values are not cloned.
     * This is a relatively expensive operation.
     *
     * @return a clone of the hashtable.
     */
    @Override
    public Object clone() {
        try {
            // Must clone, to get the class right; NBHM might have been
            // extended so it would be wrong to just make a new NBHM.
            NonBlockingHashMap<TypeK, TypeV> t = (NonBlockingHashMap<TypeK, TypeV>) super.clone();
            // But I don't have an atomic clone operation - the underlying _kvs
            // structure is undergoing rapid change.  If I just clone the _kvs
            // field, the CHM in _kvs[0] won't be in sync.
            //
            // Wipe out the cloned array (it was shallow anyways).
            t.clear();
            // Now copy sanely
            for (TypeK K : keySet()) {
                final TypeV V = get(K);  // Do an official 'get'
                t.put(K, V);
            }
            return t;
        } catch (CloneNotSupportedException e) {
            // this shouldn't happen, since we are Cloneable
            throw new InternalError();
        }
    }

    /**
     * Returns a string representation of this map. The string representation
     * consists of a list of key-value mappings in the order returned by the
     * map's &#60;tt&#62;entrySet&#60;/tt&#62; view's iterator, enclosed in braces
     * (&#60;tt&#62;"{}"&#60;/tt&#62;). Adjacent mappings are separated by the characters
     * &#60;tt&#62;", "&#60;/tt&#62; (comma and space). Each key-value mapping is rendered as
     * the key followed by an equals sign (&#60;tt&#62;"="&#60;/tt&#62;) followed by the
     * associated value. Keys and values are converted to strings as by
     * {@link String#valueOf(Object)}.
     *
     * @return a string representation of this map
     */
    @Override
    public String toString() {
        Iterator<Entry<TypeK, TypeV>> i = entrySet().iterator();
        if (!i.hasNext())
            return "{}";

        StringBuilder sb = new StringBuilder();
        sb.append('{');
        for (;;) {
            Entry<TypeK, TypeV> e = i.next();
            TypeK key = e.getKey();
            TypeV value = e.getValue();
            sb.append(key == this ? "(this Map)" : key);
            sb.append('=');
            sb.append(value == this ? "(this Map)" : value);
            if (!i.hasNext())
                return sb.append('}').toString();
            sb.append(", ");
        }
    }

    // --- keyeq ---------------------------------------------------------------
    // Check for key equality.  Try direct pointer compare first, then see if
    // the hashes are unequal (fast negative test) and finally do the full-on
    // 'equals' v-call.
    private static boolean keyeq(Object K, Object key, int[] hashes, int hash, int fullhash) {
        return K == key
            || // Either keys match exactly OR
            // hash exists and matches?  hash can be zero during the install of a
            // new key/value pair.
            ((hashes[hash] == 0 || hashes[hash] == fullhash)
            && // Do not call the users' "equals()" call with a Tombstone, as this can
            // surprise poorly written "equals()" calls that throw exceptions
            // instead of simply returning false.
            K != TOMBSTONE
            && // Do not call users' equals call with a Tombstone
            // Do the match the hard way - with the users' key being the loop-
            // invariant "this" pointer.  I could have flipped the order of
            // operands (since equals is commutative), but I'm making mega-morphic
            // v-calls in a re-probing loop and nailing down the 'this' argument
            // gives both the JIT and the hardware a chance to prefetch the call target.
            key.equals(K));          // Finally do the hard match
    }

    // --- get -----------------------------------------------------------------
    /** *  Returns the value to which the specified key is mapped, or {@code null}
     * if this map contains no mapping for the key.
     * <p>
     * More formally, if this map contains a mapping from a key {@code k} to
     * a value {@code v} such that {@code key.equals(k)}, then this method
     * returns {@code v}; otherwise it returns {@code null}. (There can be at
     * most one such mapping.)
     *
     * @param key key
     * @return Type
     * @throws NullPointerException if the specified key is null */
    // Never returns a Prime nor a Tombstone.
    @Override
    public TypeV get(Object key) {
        final Object V = get_impl(this, _kvs, key);
        assert !(V instanceof Prime); // Never return a Prime
        assert V != TOMBSTONE;
        return (TypeV) V;
    }

    private static final Object get_impl(final NonBlockingHashMap topmap, final Object[] kvs, final Object key) {
        final int fullhash = hash(key); // throws NullPointerException if key is null
        final int len = len(kvs); // Count of key/value pairs, reads kvs.length
        final CHM chm = chm(kvs); // The CHM, for a volatile read below; reads slot 0 of kvs
        final int[] hashes = hashes(kvs); // The memoized hashes; reads slot 1 of kvs

        int idx = fullhash & (len - 1); // First key hash

        // Main spin/reprobe loop, looking for a Key hit
        int reprobe_cnt = 0;
        while (true) {
            // Probe table.  Each read of 'val' probably misses in cache in a big
            // table; hopefully the read of 'key' then hits in cache.
            final Object K = key(kvs, idx); // Get key   before volatile read, could be null
            final Object V = val(kvs, idx); // Get value before volatile read, could be null or Tombstone or Prime
            if (K == null) return null;   // A clear miss

            // We need a volatile-read here to preserve happens-before semantics on
            // newly inserted Keys.  If the Key body was written just before inserting
            // into the table a Key-compare here might read the uninitialized Key body.
            // Annoyingly this means we have to volatile-read before EACH key compare.
            // .
            // We also need a volatile-read between reading a newly inserted Value
            // and returning the Value (so the user might end up reading the stale
            // Value contents).  Same problem as with keys - and the one volatile
            // read covers both.
            final Object[] newkvs = chm._newkvs; // VOLATILE READ before key compare

            // Key-compare
            if (keyeq(K, key, hashes, idx, fullhash)) {
                // Key hit!  Check for no table-copy-in-progress
                if (!(V instanceof Prime)) // No copy?
                    return (V == TOMBSTONE) ? null : V; // Return the value
                // Key hit - but slot is (possibly partially) copied to the new table.
                // Finish the copy & retry in the new table.
                return get_impl(topmap, chm.copy_slot_and_check(topmap, kvs, idx, key), key); // Retry in the new table
            }
            // get and put must have the same key lookup logic!  But only 'put'
            // needs to force a table-resize for a too-long key-reprobe sequence.
            // Check for too-many-reprobes on get - and flip to the new table.
            if (++reprobe_cnt >= reprobe_limit(len)
                || // too many probes
                K == TOMBSTONE) // found a TOMBSTONE key, means no more keys in this table
                return newkvs == null ? null : get_impl(topmap, topmap.help_copy(newkvs), key); // Retry in the new table

            idx = (idx + 1) & (len - 1);    // Reprobe by 1!  (could now prefetch)
        }
    }

    // --- getk -----------------------------------------------------------------
    /** Returns the Key to which the specified key is mapped, or {@code null}
     * if this map contains no mapping for the key.
     *
     * @param key TypeK
     * @return TypeK
     * @throws NullPointerException if the specified key is null */
    // Never returns a Prime nor a Tombstone.
    public TypeK getk(TypeK key) {
        return (TypeK) getk_impl(this, _kvs, key);
    }

    private static final Object getk_impl(final NonBlockingHashMap topmap, final Object[] kvs, final Object key) {
        final int fullhash = hash(key); // throws NullPointerException if key is null
        final int len = len(kvs); // Count of key/value pairs, reads kvs.length
        final CHM chm = chm(kvs); // The CHM, for a volatile read below; reads slot 0 of kvs
        final int[] hashes = hashes(kvs); // The memoized hashes; reads slot 1 of kvs

        int idx = fullhash & (len - 1); // First key hash

        // Main spin/reprobe loop, looking for a Key hit
        int reprobe_cnt = 0;
        while (true) {
            // Probe table.
            final Object K = key(kvs, idx); // Get key before volatile read, could be null
            if (K == null) return null;   // A clear miss

            // We need a volatile-read here to preserve happens-before semantics on
            // newly inserted Keys.  If the Key body was written just before inserting
            // into the table a Key-compare here might read the uninitialized Key body.
            // Annoyingly this means we have to volatile-read before EACH key compare.
            // .
            // We also need a volatile-read between reading a newly inserted Value
            // and returning the Value (so the user might end up reading the stale
            // Value contents).  Same problem as with keys - and the one volatile
            // read covers both.
            final Object[] newkvs = chm._newkvs; // VOLATILE READ before key compare

            // Key-compare
            if (keyeq(K, key, hashes, idx, fullhash))
                return K;              // Return existing Key!

            // get and put must have the same key lookup logic!  But only 'put'
            // needs to force a table-resize for a too-long key-reprobe sequence.
            // Check for too-many-reprobes on get - and flip to the new table.
            if (++reprobe_cnt >= reprobe_limit(len)
                || // too many probes
                K == TOMBSTONE) { // found a TOMBSTONE key, means no more keys in this table
                return newkvs == null ? null : getk_impl(topmap, topmap.help_copy(newkvs), key); // Retry in the new table
            }

            idx = (idx + 1) & (len - 1);    // Reprobe by 1!  (could now prefetch)
        }
    }

    static volatile int DUMMY_VOLATILE;

    /**
     * Put, Remove, PutIfAbsent, etc. Return the old value. If the returned value is equal to expVal (or expVal is
     * {@link #NO_MATCH_OLD}) then the put can be assumed to work (although might have been immediately overwritten).
     * Only the path through copy_slot passes in an expected value of null, and putIfMatch only returns a null if passed
     * in an expected null.
     *
     * @param topmap the map to act on
     * @param kvs    the KV table snapshot we act on
     * @param key    not null (will result in {@link NullPointerException})
     * @param putval the new value to use. Not null. {@link #TOMBSTONE} will result in deleting the entry.
     * @param expVal expected old value. Can be null. {@link #NO_MATCH_OLD} for an unconditional put/remove.
     *               {@link #TOMBSTONE} if we expect old entry to not exist(null/{@link #TOMBSTONE} value).
     *               {@link #MATCH_ANY} will ignore the current value, but only if an entry exists. A null expVal is used
     *               internally to perform a strict insert-if-never-been-seen-before operation.
     *
     * @return {@link #TOMBSTONE} if key does not exist or match has failed. null if expVal is
     *         null AND old value was null. Otherwise the old entry value (not null).
     */
    private static final Object putIfMatch0(
        final NonBlockingHashMap topmap,
        final Object[] kvs,
        final Object key,
        final Object putval,
        final Object expVal) {
        assert putval != null;
        assert !(putval instanceof Prime);
        assert !(expVal instanceof Prime);
        final int fullhash = hash(key); // throws NullPointerException if key null
        final int len = len(kvs); // Count of key/value pairs, reads kvs.length
        final CHM chm = chm(kvs); // Reads kvs[0]
        final int[] hashes = hashes(kvs); // Reads kvs[1], read before kvs[0]
        int idx = fullhash & (len - 1);

        // ---
        // Key-Claim stanza: spin till we can claim a Key (or force a resizing).
        int reprobe_cnt = 0;
        Object K = null, V = null;
        Object[] newkvs = null;
        while (true) {             // Spin till we get a Key slot
            V = val(kvs, idx);         // Get old value (before volatile read below!)
            K = key(kvs, idx);         // Get current key
            if (K == null) {         // Slot is free?
                // Found an empty Key slot - which means this Key has never been in
                // this table.  No need to put a Tombstone - the Key is not here!
                if (putval == TOMBSTONE) return TOMBSTONE; // Not-now & never-been in this table
                if (expVal == MATCH_ANY) return TOMBSTONE; // Will not match, even after K inserts
                // Claim the null key-slot
                if (CAS_key(kvs, idx, null, key)) { // Claim slot for Key
                    chm._slots.add(1);      // Raise key-slots-used count
                    hashes[idx] = fullhash; // Memoize fullhash
                    break;                  // Got it!
                }
                // CAS to claim the key-slot failed.
                //
                // This re-read of the Key points out an annoying short-coming of Java
                // CAS.  Most hardware CAS's report back the existing value - so that
                // if you fail you have a *witness* - the value which caused the CAS to
                // fail.  The Java API turns this into a boolean destroying the
                // witness.  Re-reading does not recover the witness because another
                // thread can write over the memory after the CAS.  Hence we can be in
                // the unfortunate situation of having a CAS fail *for cause* but
                // having that cause removed by a later store.  This turns a
                // non-spurious-failure CAS (such as Azul has) into one that can
                // apparently spuriously fail - and we avoid apparent spurious failure
                // by not allowing Keys to ever change.

                // Volatile read, to force loads of K to retry despite JIT, otherwise
                // it is legal to e.g. haul the load of "K = key(kvs,idx);" outside of
                // this loop (since failed CAS ops have no memory ordering semantics).
                int dummy = DUMMY_VOLATILE;
                continue;
            }
            // Key slot was not null, there exists a Key here

            // We need a volatile-read here to preserve happens-before semantics on
            // newly inserted Keys.  If the Key body was written just before inserting
            // into the table a Key-compare here might read the uninitialized Key body.
            // Annoyingly this means we have to volatile-read before EACH key compare.
            newkvs = chm._newkvs;     // VOLATILE READ before key compare

            if (keyeq(K, key, hashes, idx, fullhash))
                break;                  // Got it!

            // get and put must have the same key lookup logic!  Lest 'get' give
            // up looking too soon.
            //topmap._reprobes.add(1);
            if (++reprobe_cnt >= reprobe_limit(len)
                || // too many probes or
                K == TOMBSTONE) { // found a TOMBSTONE key, means no more keys
                // We simply must have a new table to do a 'put'.  At this point a
                // 'get' will also go to the new table (if any).  We do not need
                // to claim a key slot (indeed, we cannot find a free one to claim!).
                newkvs = chm.resize(topmap, kvs);
                if (expVal != null) topmap.help_copy(newkvs); // help along an existing copy
                return putIfMatch0(topmap, newkvs, key, putval, expVal);
            }

            idx = (idx + 1) & (len - 1); // Reprobe!
        } // End of spinning till we get a Key slot

        while (true) {              // Spin till we insert a value
            // ---
            // Found the proper Key slot, now update the matching Value slot.  We
            // never put a null, so Value slots monotonically move from null to
            // not-null (deleted Values use Tombstone).  Thus if 'V' is null we
            // fail this fast cutout and fall into the check for table-full.
            if (putval == V) return V; // Fast cutout for no-change

            // See if we want to move to a new table (to avoid high average re-probe
            // counts).  We only check on the initial set of a Value from null to
            // not-null (i.e., once per key-insert).  Of course we got a 'free' check
            // of newkvs once per key-compare (not really free, but paid-for by the
            // time we get here).
            if (newkvs == null
                && // New table-copy already spotted?
                // Once per fresh key-insert check the hard way
                ((V == null && chm.tableFull(reprobe_cnt, len))
                || // Or we found a Prime, but the JMM allowed reordering such that we
                // did not spot the new table (very rare race here: the writing
                // thread did a CAS of _newkvs then a store of a Prime.  This thread
                // reads the Prime, then reads _newkvs - but the read of Prime was so
                // delayed (or the read of _newkvs was so accelerated) that they
                // swapped and we still read a null _newkvs.  The resize call below
                // will do a CAS on _newkvs forcing the read.
                V instanceof Prime)) {
                newkvs = chm.resize(topmap, kvs); // Force the new table copy to start
            }
            // See if we are moving to a new table.
            // If so, copy our slot and retry in the new table.
            if (newkvs != null) {
                return putIfMatch0(topmap, chm.copy_slot_and_check(topmap, kvs, idx, expVal), key, putval, expVal);
            }
            // ---
            // We are finally prepared to update the existing table
            assert !(V instanceof Prime);

            // Must match old, and we do not?  Then bail out now.  Note that either V
            // or expVal might be TOMBSTONE.  Also V can be null, if we've never
            // inserted a value before.  expVal can be null if we are called from
            // copy_slot.
            if (expVal != NO_MATCH_OLD
                && // Do we care about expected-Value at all?
                V != expVal
                && // No instant match already?
                (expVal != MATCH_ANY || V == TOMBSTONE || V == null)
                && !(V == null && expVal == TOMBSTONE)
                && // Match on null/TOMBSTONE combo
                (expVal == null || !expVal.equals(V))) { // Expensive equals check at the last
                return (V == null) ? TOMBSTONE : V;         // Do not update!
            }

            // Actually change the Value in the Key,Value pair
            if (CAS_val(kvs, idx, V, putval)) break;

            // CAS failed
            // Because we have no witness, we do not know why it failed.
            // Indeed, by the time we look again the value under test might have flipped
            // a thousand times and now be the expected value (despite the CAS failing).
            // Check for the never-succeed condition of a Prime value and jump to any
            // nested table, or else just re-run.
            // We would not need this load at all if CAS returned the value on which
            // the CAS failed (AKA witness). The new CAS semantics are supported via
            // VarHandle in JDK9.
            V = val(kvs, idx);         // Get new value

            // If a Prime'd value got installed, we need to re-run the put on the
            // new table.  Otherwise we lost the CAS to another racing put.
            if (V instanceof Prime)
                return putIfMatch0(topmap, chm.copy_slot_and_check(topmap, kvs, idx, expVal), key, putval, expVal);

            // Simply retry from the start.
            // NOTE: need the fence, since otherwise 'val(kvs,idx)' load could be hoisted
            // out of loop.
            int dummy = DUMMY_VOLATILE;
        }

        // CAS succeeded - we did the update!
        // Both normal put's and table-copy calls putIfMatch, but table-copy
        // does not (effectively) increase the number of live k/v pairs.
        if (expVal != null) {
            // Adjust sizes - a striped counter
            if ((V == null || V == TOMBSTONE) && putval != TOMBSTONE) chm._size.add(1);
            if (!(V == null || V == TOMBSTONE) && putval == TOMBSTONE) chm._size.add(-1);
        }

        // We won; we know the update happened as expected.
        return (V == null && expVal != null) ? TOMBSTONE : V;
    }

    // --- help_copy ---------------------------------------------------------
    // Help along an existing resize operation.  This is just a fast cut-out
    // wrapper, to encourage inlining for the fast no-copy-in-progress case.  We
    // always help the top-most table copy, even if there are nested table
    // copies in progress.
    private final Object[] help_copy(Object[] helper) {
        // Read the top-level KVS only once.  We'll try to help this copy along,
        // even if it gets promoted out from under us (i.e., the copy completes
        // and another KVS becomes the top-level copy).
        Object[] topkvs = _kvs;
        CHM topchm = chm(topkvs);
        if (topchm._newkvs == null) return helper; // No copy in-progress
        topchm.help_copy_impl(this, topkvs, false);
        return helper;
    }

    // --- CHM -----------------------------------------------------------------
    // The control structure for the NonBlockingHashMap
    private static final class CHM<TypeK, TypeV> {
        // Size in active K,V pairs

        private final ConcurrentAutoTable _size;

        public int size() {
            return (int) _size.get();
        }

        // ---
        // These next 2 fields are used in the resizing heuristics, to judge when
        // it is time to resize or copy the table.  Slots is a count of used-up
        // key slots, and when it nears a large fraction of the table we probably
        // end up reprobing too much.  Last-resize-milli is the time since the
        // last resize; if we are running back-to-back resizes without growing
        // (because there are only a few live keys but many slots full of dead
        // keys) then we need a larger table to cut down on the churn.
        // Count of used slots, to tell when table is full of dead unusable slots
        private final ConcurrentAutoTable _slots;

        public int slots() {
            return (int) _slots.get();
        }

        // ---
        // New mappings, used during resizing.
        // The 'new KVs' array - created during a resize operation.  This
        // represents the new table being copied from the old one.  It's the
        // volatile variable that is read as we cross from one table to the next,
        // to get the required memory orderings.  It monotonically transits from
        // null to set (once).
        volatile Object[] _newkvs;

        private static final AtomicReferenceFieldUpdater<CHM, Object[]> _newkvsUpdater
            = AtomicReferenceFieldUpdater.newUpdater(CHM.class, Object[].class, "_newkvs");
        // Set the _next field if we can.

        boolean CAS_newkvs(Object[] newkvs) {
            while (_newkvs == null)
                if (_newkvsUpdater.compareAndSet(this, null, newkvs))
                    return true;
            return false;
        }

        // Sometimes many threads race to create a new very large table.  Only 1
        // wins the race, but the losers all allocate a junk large table with
        // hefty allocation costs.  Attempt to control the overkill here by
        // throttling attempts to create a new table.  I cannot really block here
        // (lest I lose the non-blocking property) but late-arriving threads can
        // give the initial resizing thread a little time to allocate the initial
        // new table.  The Right Long Term Fix here is to use array-lets and
        // incrementally create the new very large array.  In C I'd make the array
        // with malloc (which would mmap under the hood) which would only eat
        // virtual-address and not real memory - and after Somebody wins then we
        // could in parallel initialize the array.  Java does not allow
        // un-initialized array creation (especially of ref arrays!).
        volatile long _resizers; // count of threads attempting an initial resize

        private static final AtomicLongFieldUpdater<CHM> _resizerUpdater
            = AtomicLongFieldUpdater.newUpdater(CHM.class, "_resizers");

        // ---
        // Simple constructor
        CHM(ConcurrentAutoTable size) {
            _size = size;
            _slots = new ConcurrentAutoTable();
        }

        // --- tableFull ---------------------------------------------------------
        // Heuristic to decide if this table is too full, and we should start a
        // new table.  Note that if a 'get' call has reprobed too many times and
        // decided the table must be full, then always the estimate_sum must be
        // high and we must report the table is full.  If we do not, then we might
        // end up deciding that the table is not full and inserting into the
        // current table, while a 'get' has decided the same key cannot be in this
        // table because of too many reprobes.  The invariant is:
        //   slots.estimate_sum >= max_reprobe_cnt >= reprobe_limit(len)
        private final boolean tableFull(int reprobe_cnt, int len) {
            return // Do the cheap check first: we allow some number of reprobes always
                reprobe_cnt >= REPROBE_LIMIT
                && (reprobe_cnt >= reprobe_limit(len)
                || // More expensive check: see if the table is > 1/2 full.
                _slots.estimate_get() >= (len >> 1));
        }

        // --- resize ------------------------------------------------------------
        // Resizing after too many probes.  "How Big???" heuristics are here.
        // Callers will (not this routine) will 'help_copy' any in-progress copy.
        // Since this routine has a fast cutout for copy-already-started, callers
        // MUST 'help_copy' lest we have a path which forever runs through
        // 'resize' only to discover a copy-in-progress which never progresses.
        private final Object[] resize(NonBlockingHashMap topmap, Object[] kvs) {
            assert chm(kvs) == this;

            // Check for resize already in progress, probably triggered by another thread
            Object[] newkvs = _newkvs; // VOLATILE READ
            if (newkvs != null) // See if resize is already in progress
                return newkvs;           // Use the new table already

            // No copy in-progress, so start one.  First up: compute new table size.
            int oldlen = len(kvs);    // Old count of K,V pairs allowed
            int sz = size();          // Get current table count of active K,V pairs
            int newsz = sz;           // First size estimate

            // Heuristic to determine new size.  We expect plenty of dead-slots-with-keys
            // and we need some decent padding to avoid endless reprobing.
            if (sz >= (oldlen >> 2)) { // If we are >25% full of keys then...
                newsz = oldlen << 1;      // Double size, so new table will be between 12.5% and 25% full
                // For tables less than 1M entries, if >50% full of keys then...
                // For tables more than 1M entries, if >75% full of keys then...
                if (4L * sz >= ((oldlen >> 20) != 0 ? 3L : 2L) * oldlen)
                    newsz = oldlen << 2;    // Double double size, so new table will be between %12.5 (18.75%) and 25% (25%)
            }
            // This heuristic in the next 2 lines leads to a much denser table
            // with a higher reprobe rate
            //if( sz >= (oldlen>>1) ) // If we are >50% full of keys then...
            //  newsz = oldlen<<1;    // Double size

            // Last (re)size operation was very recent?  Then double again
            // despite having few live keys; slows down resize operations
            // for tables subject to a high key churn rate - but do not
            // forever grow the table.  If there is a high key churn rate
            // the table needs a steady state of rare same-size resize
            // operations to clean out the dead keys.
            long tm = System.currentTimeMillis();
            if (newsz <= oldlen
                && // New table would shrink or hold steady?
                tm <= topmap._last_resize_milli + 10000) // Recent resize (less than 10 sec ago)
                newsz = oldlen << 1;      // Double the existing size
            // Do not shrink, ever.  If we hit this size once, assume we
            // will again.
            if (newsz < oldlen) newsz = oldlen;
            // Convert to power-of-2
            int log2;
            for (log2 = MIN_SIZE_LOG; (1 << log2) < newsz; log2++) ; // Compute log2 of size
            long len = ((1L << log2) << 1) + 2;
            // prevent integer overflow - limit of 2^31 elements in a Java array
            // so here, 2^30 + 2 is the largest number of elements in the hash table
            if ((int) len != len) {
                log2 = 30;
                len = (1L << log2) + 2;
                if (sz > ((len >> 2) + (len >> 1))) throw new RuntimeException("Table is full.");
            }

            // Now limit the number of threads actually allocating memory to a
            // handful - lest we have 750 threads all trying to allocate a giant
            // resized array.
            long r = _resizers;
            while (!_resizerUpdater.compareAndSet(this, r, r + 1))
                r = _resizers;
            // Size calculation: 2 words (K+V) per table entry, plus a handful.  We
            // guess at 64-bit pointers; 32-bit pointers screws up the size calc by
            // 2x but does not screw up the heuristic very much.
            long megs = ((((1L << log2) << 1) + 8) << 3/* word to bytes */) >> 20/* megs */;
            if (r >= 2 && megs > 0) { // Already 2 guys trying; wait and see
                newkvs = _newkvs;        // Between dorking around, another thread did it
                if (newkvs != null) // See if resize is already in progress
                    return newkvs;         // Use the new table already
                // TODO - use a wait with timeout, so we'll wakeup as soon as the new table
                // is ready, or after the timeout in any case.
                // For now, sleep a tad and see if the 2 guys already trying to make
                // the table actually get around to making it happen.
                try {
                    Thread.sleep(megs);
                } catch (Exception e) {
                }
            }
            // Last check, since the 'new' below is expensive and there is a chance
            // that another thread slipped in a new thread while we ran the heuristic.
            newkvs = _newkvs;
            if (newkvs != null) // See if resize is already in progress
                return newkvs;          // Use the new table already

            // Double size for K,V pairs, add 1 for CHM
            newkvs = new Object[(int) len]; // This can get expensive for big arrays
            newkvs[0] = new CHM(_size); // CHM in slot 0
            newkvs[1] = new int[1 << log2]; // hashes in slot 1

            // Another check after the slow allocation
            if (_newkvs != null) // See if resize is already in progress
                return _newkvs;         // Use the new table already

            // The new table must be CAS'd in so only 1 winner amongst duplicate
            // racing resizing threads.  Extra CHM's will be GC'd.
            if (CAS_newkvs(newkvs)) { // NOW a resize-is-in-progress!
                //notifyAll();            // Wake up any sleepers
                //long nano = System.nanoTime();
                //System.out.println(" "+nano+" Resize from "+oldlen+" to "+(1<<log2)+" and had "+(_resizers-1)+" extras" );
                //if( System.out != null ) System.out.print("["+log2);
                topmap.rehash();        // Call for Hashtable's benefit
            } else // CAS failed?
                newkvs = _newkvs;       // Reread new table
            return newkvs;
        }

        // The next part of the table to copy.  It monotonically transits from zero
        // to _kvs.length.  Visitors to the table can claim 'work chunks' by
        // CAS'ing this field up, then copying the indicated indices from the old
        // table to the new table.  Workers are not required to finish any chunk;
        // the counter simply wraps and work is copied duplicately until somebody
        // somewhere completes the count.
        volatile long _copyIdx = 0;

        static private final AtomicLongFieldUpdater<CHM> _copyIdxUpdater
            = AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyIdx");

        // Work-done reporting.  Used to efficiently signal when we can move to
        // the new table.  From 0 to len(oldkvs) refers to copying from the old
        // table to the new.
        volatile long _copyDone = 0;

        static private final AtomicLongFieldUpdater<CHM> _copyDoneUpdater
            = AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyDone");

        // --- help_copy_impl ----------------------------------------------------
        // Help along an existing resize operation.  We hope its the top-level
        // copy (it was when we started) but this CHM might have been promoted out
        // of the top position.
        private final void help_copy_impl(NonBlockingHashMap topmap, Object[] oldkvs, boolean copy_all) {
            assert chm(oldkvs) == this;
            Object[] newkvs = _newkvs;
            assert newkvs != null;    // Already checked by caller
            int oldlen = len(oldkvs); // Total amount to copy
            final int MIN_COPY_WORK = Math.min(oldlen, 1024); // Limit per-thread work

            // ---
            int panic_start = -1;
            int copyidx = -9999;            // Fool javac to think it's initialized
            while (_copyDone < oldlen) { // Still needing to copy?
                // Carve out a chunk of work.  The counter wraps around so every
                // thread eventually tries to copy every slot repeatedly.

                // We "panic" if we have tried TWICE to copy every slot - and it still
                // has not happened.  i.e., twice some thread somewhere claimed they
                // would copy 'slot X' (by bumping _copyIdx) but they never claimed to
                // have finished (by bumping _copyDone).  Our choices become limited:
                // we can wait for the work-claimers to finish (and become a blocking
                // algorithm) or do the copy work ourselves.  Tiny tables with huge
                // thread counts trying to copy the table often 'panic'.
                if (panic_start == -1) { // No panic?
                    copyidx = (int) _copyIdx;
                    while (!_copyIdxUpdater.compareAndSet(this, copyidx, copyidx + MIN_COPY_WORK))
                        copyidx = (int) _copyIdx;      // Re-read
                    if (!(copyidx < (oldlen << 1))) // Panic!
                        panic_start = copyidx;        // Record where we started to panic-copy
                }

                // We now know what to copy.  Try to copy.
                int workdone = 0;
                for (int i = 0; i < MIN_COPY_WORK; i++)
                    if (copy_slot(topmap, (copyidx + i) & (oldlen - 1), oldkvs, newkvs)) // Made an oldtable slot go dead?
                        workdone++;         // Yes!
                if (workdone > 0) // Report work-done occasionally
                    copy_check_and_promote(topmap, oldkvs, workdone);// See if we can promote
                //for( int i=0; i<MIN_COPY_WORK; i++ )
                //  if( copy_slot(topmap,(copyidx+i)&(oldlen-1),oldkvs,newkvs) ) // Made an oldtable slot go dead?
                //    copy_check_and_promote( topmap, oldkvs, 1 );// See if we can promote

                copyidx += MIN_COPY_WORK;
                // Uncomment these next 2 lines to turn on incremental table-copy.
                // Otherwise this thread continues to copy until it is all done.
                if (!copy_all && panic_start == -1) // No panic?
                    return;       // Then done copying after doing MIN_COPY_WORK
            }
            // Extra promotion check, in case another thread finished all copying
            // then got stalled before promoting.
            copy_check_and_promote(topmap, oldkvs, 0);// See if we can promote
        }

        // --- copy_slot_and_check -----------------------------------------------
        // Copy slot 'idx' from the old table to the new table.  If this thread
        // confirmed the copy, update the counters and check for promotion.
        //
        // Returns the result of reading the volatile _newkvs, mostly as a
        // convenience to callers.  We come here with 1-shot copy requests
        // typically because the caller has found a Prime, and has not yet read
        // the _newkvs volatile - which must have changed from null-to-not-null
        // before any Prime appears.  So the caller needs to read the _newkvs
        // field to retry his operation in the new table, but probably has not
        // read it yet.
        private final Object[] copy_slot_and_check(NonBlockingHashMap topmap, Object[] oldkvs, int idx, Object should_help) {
            assert chm(oldkvs) == this;
            Object[] newkvs = _newkvs; // VOLATILE READ
            // We're only here because the caller saw a Prime, which implies a
            // table-copy is in progress.
            assert newkvs != null;
            if (copy_slot(topmap, idx, oldkvs, _newkvs)) // Copy the desired slot
                copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied
            // Generically help along any copy (except if called recursively from a helper)
            return (should_help == null) ? newkvs : topmap.help_copy(newkvs);
        }

        // --- copy_check_and_promote --------------------------------------------
        private final void copy_check_and_promote(NonBlockingHashMap topmap, Object[] oldkvs, int workdone) {
            assert chm(oldkvs) == this;
            int oldlen = len(oldkvs);
            // We made a slot unusable and so did some of the needed copy work
            long copyDone = _copyDone;
            assert (copyDone + workdone) <= oldlen;
            if (workdone > 0) {
                while (!_copyDoneUpdater.compareAndSet(this, copyDone, copyDone + workdone)) {
                    copyDone = _copyDone; // Reload, retry
                    assert (copyDone + workdone) <= oldlen;
                }
            }

            // Check for copy being ALL done, and promote.  Note that we might have
            // nested in-progress copies and manage to finish a nested copy before
            // finishing the top-level copy.  We only promote top-level copies.
            if (copyDone + workdone == oldlen
                && // Ready to promote this table?
                topmap._kvs == oldkvs
                && // Looking at the top-level table?
                // Attempt to promote
                topmap.CAS_kvs(oldkvs, _newkvs)) {
                topmap._last_resize_milli = System.currentTimeMillis(); // Record resize time for next check
            }
        }

        // --- copy_slot ---------------------------------------------------------
        // Copy one K/V pair from oldkvs[i] to newkvs.  Returns true if we can
        // confirm that we set an old-table slot to TOMBPRIME, and only returns after
        // updating the new table.  We need an accurate confirmed-copy count so
        // that we know when we can promote (if we promote the new table too soon,
        // other threads may 'miss' on values not-yet-copied from the old table).
        // We don't allow any direct updates on the new table, unless they first
        // happened to the old table - so that any transition in the new table from
        // null to not-null must have been from a copy_slot (or other old-table
        // overwrite) and not from a thread directly writing in the new table.
        private boolean copy_slot(NonBlockingHashMap topmap, int idx, Object[] oldkvs, Object[] newkvs) {
            // Blindly set the key slot from null to TOMBSTONE, to eagerly stop
            // fresh put's from inserting new values in the old table when the old
            // table is mid-resize.  We don't need to act on the results here,
            // because our correctness stems from box'ing the Value field.  Slamming
            // the Key field is a minor speed optimization.
            Object key;
            while ((key = key(oldkvs, idx)) == null)
                CAS_key(oldkvs, idx, null, TOMBSTONE);

            // ---
            // Prevent new values from appearing in the old table.
            // Box what we see in the old table, to prevent further updates.
            Object oldval = val(oldkvs, idx); // Read OLD table
            while (!(oldval instanceof Prime)) {
                final Prime box = (oldval == null || oldval == TOMBSTONE) ? TOMBPRIME : new Prime(oldval);
                if (CAS_val(oldkvs, idx, oldval, box)) { // CAS down a box'd version of oldval
                    // If we made the Value slot hold a TOMBPRIME, then we both
                    // prevented further updates here but also the (absent)
                    // oldval is vacuously available in the new table.  We
                    // return with true here: any thread looking for a value for
                    // this key can correctly go straight to the new table and
                    // skip looking in the old table.
                    if (box == TOMBPRIME)
                        return true;
                    // Otherwise we boxed something, but it still needs to be
                    // copied into the new table.
                    oldval = box;         // Record updated oldval
                    break;                // Break loop; oldval is now boxed by us
                }
                oldval = val(oldkvs, idx); // Else try, try again
            }
            if (oldval == TOMBPRIME) return false; // Copy already complete here!

            // ---
            // Copy the value into the new table, but only if we overwrite a null.
            // If another value is already in the new table, then somebody else
            // wrote something there and that write is happens-after any value that
            // appears in the old table.
            Object old_unboxed = ((Prime) oldval)._V;
            assert old_unboxed != TOMBSTONE;
            putIfMatch0(topmap, newkvs, key, old_unboxed, null);

            // ---
            // Finally, now that any old value is exposed in the new table, we can
            // forever hide the old-table value by slapping a TOMBPRIME down.  This
            // will stop other threads from uselessly attempting to copy this slot
            // (i.e., it's a speed optimization not a correctness issue).
            while (oldval != TOMBPRIME && !CAS_val(oldkvs, idx, oldval, TOMBPRIME))
                oldval = val(oldkvs, idx);

            return oldval != TOMBPRIME; // True if we slammed the TOMBPRIME down
        } // end copy_slot
    } // End of CHM

    // --- Snapshot ------------------------------------------------------------
    // The main class for iterating over the NBHM.  It "snapshots" a clean
    // view of the K/V array.
    private class SnapshotV implements Iterator<TypeV>, Enumeration<TypeV> {

        final Object[] _sskvs;

        public SnapshotV() {
            while (true) {           // Verify no table-copy-in-progress
                Object[] topkvs = _kvs;
                CHM topchm = chm(topkvs);
                if (topchm._newkvs == null) { // No table-copy-in-progress
                    // The "linearization point" for the iteration.  Every key in this
                    // table will be visited, but keys added later might be skipped or
                    // even be added to a following table (also not iterated over).
                    _sskvs = topkvs;
                    break;
                }
                // Table copy in-progress - so we cannot get a clean iteration.  We
                // must help finish the table copy before we can start iterating.
                topchm.help_copy_impl(NonBlockingHashMap.this, topkvs, true);
            }
            // Warm-up the iterator
            next();
        }

        int length() {
            return len(_sskvs);
        }

        Object key(int idx) {
            return NonBlockingHashMap.key(_sskvs, idx);
        }

        private int _idx;              // Varies from 0-keys.length

        private Object _nextK, _prevK; // Last 2 keys found

        private TypeV _nextV, _prevV; // Last 2 values found

        public boolean hasNext() {
            return _nextV != null;
        }

        public TypeV next() {
            // 'next' actually knows what the next value will be - it had to
            // figure that out last go-around lest 'hasNext' report true and
            // some other thread deleted the last value.  Instead, 'next'
            // spends all its effort finding the key that comes after the
            // 'next' key.
            if (_idx != 0 && _nextV == null) throw new NoSuchElementException();
            _prevK = _nextK;          // This will become the previous key
            _prevV = _nextV;          // This will become the previous value
            _nextV = null;            // We have no more next-key
            // Attempt to set <_nextK,_nextV> to the next K,V pair.
            // _nextV is the trigger: stop searching when it is != null
            while (_idx < length()) {  // Scan array
                _nextK = key(_idx++); // Get a key that definitely is in the set (for the moment!)
                if (_nextK != null
                    && // Found something?
                    _nextK != TOMBSTONE
                    && (_nextV = get(_nextK)) != null)
                    break;                // Got it!  _nextK is a valid Key
            }                         // Else keep scanning
            return _prevV;            // Return current value.
        }

        public void removeKey() {
            if (_prevV == null) throw new IllegalStateException();
            NonBlockingHashMap.this.putIfMatch(_prevK, TOMBSTONE, NO_MATCH_OLD);
            _prevV = null;
        }

        @Override
        public void remove() {
            // NOTE: it would seem logical that value removal will semantically mean removing the matching value for the
            // mapping <k,v>, but the JDK always removes by key, even when the value has changed.
            removeKey();
        }

        public TypeV nextElement() {
            return next();
        }

        public boolean hasMoreElements() {
            return hasNext();
        }
    }

    public Object[] raw_array() {
        return new SnapshotV()._sskvs;
    }

    /** Returns an enumeration of the values in this table.
     *
     * @return an enumeration of the values in this table
     * @see #values() */
    public Enumeration<TypeV> elements() {
        return new SnapshotV();
    }

    // --- values --------------------------------------------------------------
    /** Returns a {@link Collection} view of the values contained in this map.
     * The collection is backed by the map, so changes to the map are reflected
     * in the collection, and vice-versa. The collection supports element
     * removal, which removes the corresponding mapping from this map, via the
     * &#60;tt&#62;Iterator.remove&#60;/tt&#62;, &#60;tt&#62;Collection.remove&#60;/tt&#62;,
     * &#60;tt&#62;removeAll&#60;/tt&#62;, &#60;tt&#62;retainAll&#60;/tt&#62;, and &#60;tt&#62;clear&#60;/tt&#62; operations.
     * It does not support the &#60;tt&#62;add&#60;/tt&#62; or &#60;tt&#62;addAll&#60;/tt&#62; operations.
     *
     * <p>
     * The view's &#60;tt&#62;iterator&#60;/tt&#62; is a "weakly consistent" iterator that
     * will never throw {@link ConcurrentModificationException}, and guarantees
     * to traverse elements as they existed upon construction of the iterator,
     * and may (but is not guaranteed to) reflect any modifications subsequent
     * to construction. */
    @Override
    public Collection<TypeV> values() {
        return new AbstractCollection<TypeV>() {
            @Override
            public void clear() {
                NonBlockingHashMap.this.clear();
            }

            @Override
            public int size() {
                return NonBlockingHashMap.this.size();
            }

            @Override
            public boolean contains(Object v) {
                return NonBlockingHashMap.this.containsValue(v);
            }

            @Override
            public Iterator<TypeV> iterator() {
                return new SnapshotV();
            }
        };
    }

    // --- keySet --------------------------------------------------------------
    private class SnapshotK implements Iterator<TypeK>, Enumeration<TypeK> {

        final SnapshotV _ss;

        public SnapshotK() {
            _ss = new SnapshotV();
        }

        public void remove() {
            _ss.removeKey();
        }

        public TypeK next() {
            _ss.next();
            return (TypeK) _ss._prevK;
        }

        public boolean hasNext() {
            return _ss.hasNext();
        }

        public TypeK nextElement() {
            return next();
        }

        public boolean hasMoreElements() {
            return hasNext();
        }
    }

    /** Returns an enumeration of the keys in this table.
     *
     * @return an enumeration of the keys in this table
     * @see #keySet() */
    public Enumeration<TypeK> keys() {
        return new SnapshotK();
    }

    /** Returns a {@link Set} view of the keys contained in this map. The set
     * is backed by the map, so changes to the map are reflected in the set,
     * and vice-versa. The set supports element removal, which removes the
     * corresponding mapping from this map, via the &#60;tt&#62;Iterator.remove&#60;/tt&#62;,
     * &#60;tt&#62;Set.remove&#60;/tt&#62;, &#60;tt&#62;removeAll&#60;/tt&#62;, &#60;tt&#62;retainAll&#60;/tt&#62;, and
     * &#60;tt&#62;clear&#60;/tt&#62; operations. It does not support the &#60;tt&#62;add&#60;/tt&#62; or
     * &#60;tt&#62;addAll&#60;/tt&#62; operations.
     *
     * <p>
     * The view's &#60;tt&#62;iterator&#60;/tt&#62; is a "weakly consistent" iterator that
     * will never throw {@link ConcurrentModificationException}, and guarantees
     * to traverse elements as they existed upon construction of the iterator,
     * and may (but is not guaranteed to) reflect any modifications subsequent
     * to construction. */
    @Override
    public Set<TypeK> keySet() {
        return new AbstractSet<TypeK>() {
            @Override
            public void clear() {
                NonBlockingHashMap.this.clear();
            }

            @Override
            public int size() {
                return NonBlockingHashMap.this.size();
            }

            @Override
            public boolean contains(Object k) {
                return NonBlockingHashMap.this.containsKey(k);
            }

            @Override
            public boolean remove(Object k) {
                return NonBlockingHashMap.this.remove(k) != null;
            }

            @Override
            public Iterator<TypeK> iterator() {
                return new SnapshotK();
            }
            // This is an efficient implementation of toArray instead of the standard
            // one.  In particular it uses a smart iteration over the NBHM.

            @Override
            public <T> T[] toArray(T[] a) {
                Object[] kvs = raw_array();
                // Estimate size of array; be prepared to see more or fewer elements
                int sz = size();
                T[] r = a.length >= sz ? a
                    : (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), sz);
                // Fast efficient element walk.
                int j = 0;
                for (int i = 0; i < len(kvs); i++) {
                    Object K = key(kvs, i);
                    Object V = Prime.unbox(val(kvs, i));
                    if (K != null && K != TOMBSTONE && V != null && V != TOMBSTONE) {
                        if (j >= r.length) {
                            int sz2 = (int) Math.min(Integer.MAX_VALUE - 8, ((long) j) << 1);
                            if (sz2 <= r.length) throw new OutOfMemoryError("Required array size too large");
                            r = Arrays.copyOf(r, sz2);
                        }
                        r[j++] = (T) K;
                    }
                }
                if (j <= a.length) {   // Fit in the original array?
                    if (a != r) System.arraycopy(r, 0, a, 0, j);
                    if (j < a.length) r[j++] = null; // One final null not in the spec but in the default impl
                    return a;             // Return the original
                }
                return Arrays.copyOf(r, j);
            }
        };
    }

    // --- entrySet ------------------------------------------------------------
    // Warning: Each call to 'next' in this iterator constructs a new NBHMEntry.
    private class NBHMEntry extends AbstractEntry<TypeK, TypeV> {

        NBHMEntry(final TypeK k, final TypeV v) {
            super(k, v);
        }

        public TypeV setValue(final TypeV val) {
            if (val == null) throw new NullPointerException();
            _val = val;
            return put(_key, val);
        }
    }

    private class SnapshotE implements Iterator<Map.Entry<TypeK, TypeV>> {

        final SnapshotV _ss;

        public SnapshotE() {
            _ss = new SnapshotV();
        }

        public void remove() {
            // NOTE: it would seem logical that entry removal will semantically mean removing the matching pair <k,v>, but
            // the JDK always removes by key, even when the value has changed.
            _ss.removeKey();
        }

        public Map.Entry<TypeK, TypeV> next() {
            _ss.next();
            return new NBHMEntry((TypeK) _ss._prevK, _ss._prevV);
        }

        public boolean hasNext() {
            return _ss.hasNext();
        }
    }

    /** Returns a {@link Set} view of the mappings contained in this map. The
     * set is backed by the map, so changes to the map are reflected in the
     * set, and vice-versa. The set supports element removal, which removes
     * the corresponding mapping from the map, via the
     * &#60;tt&#62;Iterator.remove&#60;/tt&#62;, &#60;tt&#62;Set.remove&#60;/tt&#62;, &#60;tt&#62;removeAll&#60;/tt&#62;,
     * &#60;tt&#62;retainAll&#60;/tt&#62;, and &#60;tt&#62;clear&#60;/tt&#62; operations. It does not support
     * the &#60;tt&#62;add&#60;/tt&#62; or &#60;tt&#62;addAll&#60;/tt&#62; operations.
     *
     * <p>
     * The view's &#60;tt&#62;iterator&#60;/tt&#62; is a "weakly consistent" iterator
     * that will never throw {@link ConcurrentModificationException},
     * and guarantees to traverse elements as they existed upon
     * construction of the iterator, and may (but is not guaranteed to)
     * reflect any modifications subsequent to construction.
     *
     * <p>
     * <strong>Warning:</strong> the iterator associated with this Set
     * requires the creation of {@link java.util.Map.Entry} objects with each
     * iteration. The NonBlockingHashMap does not normally create or
     * using {@link java.util.Map.Entry} objects so they will be created soley
     * to support this iteration. Iterating using Map#keySet or Map##values will be more efficient.
     */
    @Override
    public Set<Map.Entry<TypeK, TypeV>> entrySet() {
        return new AbstractSet<Map.Entry<TypeK, TypeV>>() {
            @Override
            public void clear() {
                NonBlockingHashMap.this.clear();
            }

            @Override
            public int size() {
                return NonBlockingHashMap.this.size();
            }

            @Override
            public boolean remove(final Object o) {
                if (!(o instanceof Map.Entry)) return false;
                final Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
                return NonBlockingHashMap.this.remove(e.getKey(), e.getValue());
            }

            @Override
            public boolean contains(final Object o) {
                if (!(o instanceof Map.Entry)) return false;
                final Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
                TypeV v = get(e.getKey());
                return v != null && v.equals(e.getValue());
            }

            @Override
            public Iterator<Map.Entry<TypeK, TypeV>> iterator() {
                return new SnapshotE();
            }
        };
    }

    // --- writeObject -------------------------------------------------------
    // Write a NBHM to a stream
    private void writeObject(java.io.ObjectOutputStream s) throws IOException {
        s.defaultWriteObject();     // Nothing to write
        for (Object K : keySet()) {
            final Object V = get(K);  // Do an official 'get'
            s.writeObject(K);         // Write the <TypeK,TypeV> pair
            s.writeObject(V);
        }
        s.writeObject(null);        // Sentinel to indicate end-of-data
        s.writeObject(null);
    }

    // --- readObject --------------------------------------------------------
    // Read a CHM from a stream
    private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException {
        s.defaultReadObject();      // Read nothing
        initialize(MIN_SIZE);
        for (;;) {
            final TypeK K = (TypeK) s.readObject();
            final TypeV V = (TypeV) s.readObject();
            if (K == null) break;
            put(K, V);                 // Insert with an offical put
        }
    }

    protected abstract class AbstractEntry<TypeK, TypeV> implements Map.Entry<TypeK, TypeV> {

        /** Strongly typed key */
        protected final TypeK _key;

        /** Strongly typed value */
        protected TypeV _val;

        public AbstractEntry(final TypeK key, final TypeV val) {
            _key = key;
            _val = val;
        }

        public AbstractEntry(final Map.Entry<TypeK, TypeV> e) {
            _key = e.getKey();
            _val = e.getValue();
        }

        /** Return "key=val" string */
        public String toString() {
            return _key + "=" + _val;
        }

        /** Return key */
        public TypeK getKey() {
            return _key;
        }

        /** Return val */
        public TypeV getValue() {
            return _val;
        }

        /** Equal if the underlying key and value are equal */
        public boolean equals(final Object o) {
            if (!(o instanceof Map.Entry)) return false;
            final Map.Entry e = (Map.Entry) o;
            return eq(_key, e.getKey()) && eq(_val, e.getValue());
        }

        /** Compute <code>"key.hashCode() ^ val.hashCode()"</code> */
        public int hashCode() {
            return ((_key == null) ? 0 : _key.hashCode())
                ^ ((_val == null) ? 0 : _val.hashCode());
        }

        private boolean eq(final Object o1, final Object o2) {
            return (o1 == null ? o2 == null : o1.equals(o2));
        }
    }

    protected static class ConcurrentAutoTable implements Serializable {

        // --- public interface ---
        /**
         * Add the given value to current counter value.Concurrent updates will
         * not be lost, but addAndGet or getAndAdd are not implemented because the
         * total counter value (i.e., {@link #get}) is not atomically updated. Updates are striped across an array of counters to avoid cache contention
         * and has been tested with performance scaling linearly up to 768 CPUs.
         * @param x long
         */
        public void add(long x) {
            add_if(x);
        }

        /** {@link #add} with -1 */
        public void decrement() {
            add_if(-1L);
        }

        /** {@link #add} with +1 */
        public void increment() {
            add_if(1L);
        }

        /** *  Atomically set the sum of the striped counters to specified value.Rather more expensive than a simple store, in order to remain atomic.
         * @param x long
         */
        public void set(long x) {
            CAT newcat = new CAT(null, 4, x);
            // Spin until CAS works
            while (!CAS_cat(_cat, newcat)) {/* empty */
            }
        }

        /**
         * Current value of the counter.Since other threads are updating furiously
         * the value is only approximate, but it includes all counts made by the
         * current thread. Requires a pass over the internally striped counters.
         * @return long
         */
        public long get() {
            return _cat.sum();
        }

        /** Same as {@link #get}, included for completeness.
         * @return  int */
        public int intValue() {
            return (int) _cat.sum();
        }

        /** Same as {@link #get}, included for completeness.
         * @return  long */
        public long longValue() {
            return _cat.sum();
        }

        /**
         * A cheaper {@link #get}.Updated only once/millisecond, but as fast as a simple load instruction when not updating.
         * @return long
         */
        public long estimate_get() {
            return _cat.estimate_sum();
        }

        /**
         * Return the counter's {@code long} value converted to a string.
         * @return String
         */
        @Override
        public String toString() {
            return _cat.toString();
        }

        /**
         * A more verbose print than {@link #toString}, showing internal structure.
         * Useful for debugging.
         */
        public void print() {
            _cat.print();
        }

        /**
         * Return the internal counter striping factor.Useful for diagnosing performance problems.
         * @return int
         */
        public int internal_size() {
            return _cat._t.length;
        }

        // Only add 'x' to some slot in table, hinted at by 'hash'.  The sum can
        // overflow.  Value is CAS'd so no counts are lost.  The CAS is retried until
        // it succeeds.  Returned value is the old value.
        private long add_if(long x) {
            return _cat.add_if(x, hash(), this);
        }

        // The underlying array of concurrently updated long counters
        private volatile CAT _cat = new CAT(null, 16/* Start Small, Think Big! */, 0L);

        private static AtomicReferenceFieldUpdater<ConcurrentAutoTable, CAT> _catUpdater
            = AtomicReferenceFieldUpdater.newUpdater(ConcurrentAutoTable.class, CAT.class, "_cat");

        private boolean CAS_cat(CAT oldcat, CAT newcat) {
            return _catUpdater.compareAndSet(this, oldcat, newcat);
        }

        // Hash spreader
        private static int hash() {
            //int h = (int)Thread.currentThread().getId();
            int h = System.identityHashCode(Thread.currentThread());
            return h << 3;                // Pad out cache lines.  The goal is to avoid cache-line contention
        }

        // --- CAT -----------------------------------------------------------------
        private static class CAT implements Serializable {

            // Unsafe crud: get a function which will CAS arrays
            private static final int _Lbase = UNSAFE == null ? 0 : UNSAFE.arrayBaseOffset(long[].class);

            private static final int _Lscale = UNSAFE == null ? 0 : UNSAFE.arrayIndexScale(long[].class);

            private static long rawIndex(long[] ary, int i) {
                assert i >= 0 && i < ary.length;
                return _Lbase + (i * (long) _Lscale);
            }

            private static boolean CAS(long[] A, int idx, long old, long nnn) {
                return UNSAFE.compareAndSwapLong(A, rawIndex(A, idx), old, nnn);
            }

            //volatile long _resizers;    // count of threads attempting a resize
            //static private final AtomicLongFieldUpdater<CAT> _resizerUpdater =
            //  AtomicLongFieldUpdater.newUpdater(CAT.class, "_resizers");
            private final CAT _next;

            private volatile long _fuzzy_sum_cache;

            private volatile long _fuzzy_time;

            private static final int MAX_SPIN = 1;

            private final long[] _t;     // Power-of-2 array of longs

            CAT(CAT next, int sz, long init) {
                _next = next;
                _t = new long[sz];
                _t[0] = init;
            }

            // Only add 'x' to some slot in table, hinted at by 'hash'.  The sum can
            // overflow.  Value is CAS'd so no counts are lost.  The CAS is attempted
            // ONCE.
            public long add_if(long x, int hash, ConcurrentAutoTable master) {
                final long[] t = _t;
                final int idx = hash & (t.length - 1);
                // Peel loop; try once fast
                long old = t[idx];
                final boolean ok = CAS(t, idx, old, old + x);
                if (ok) return old;      // Got it
                // Try harder
                int cnt = 0;
                while (true) {
                    old = t[idx];
                    if (CAS(t, idx, old, old + x)) break; // Got it!
                    cnt++;
                }
                if (cnt < MAX_SPIN) return old; // Allowable spin loop count
                if (t.length >= 1024 * 1024) return old; // too big already

                // Too much contention; double array size in an effort to reduce contention
                //long r = _resizers;
                //final int newbytes = (t.length<<1)<<3/*word to bytes*/;
                //while( !_resizerUpdater.compareAndSet(this,r,r+newbytes) )
                //  r = _resizers;
                //r += newbytes;
                if (master._cat != this) return old; // Already doubled, don't bother
                //if( (r>>17) != 0 ) {      // Already too much allocation attempts?
                //  // We could use a wait with timeout, so we'll wakeup as soon as the new
                //  // table is ready, or after the timeout in any case.  Annoyingly, this
                //  // breaks the non-blocking property - so for now we just briefly sleep.
                //  try { Thread.sleep(r>>17); } catch( InterruptedException e ) { }
                //  if( master._cat != this ) return old;

                CAT newcat = new CAT(this, t.length * 2, 0);
                // Take 1 stab at updating the CAT with the new larger size.  If this
                // fails, we assume some other thread already expanded the CAT - so we
                // do not need to retry until it succeeds.
                while (master._cat == this && !master.CAS_cat(this, newcat)) {/* empty */
                }
                return old;
            }

            // Return the current sum of all things in the table.  Writers can be
            // updating the table furiously, so the sum is only locally accurate.
            public long sum() {
                long sum = _next == null ? 0 : _next.sum(); // Recursively get cached sum
                final long[] t = _t;
                for (long cnt : t) sum += cnt;
                return sum;
            }

            // Fast fuzzy version.  Used a cached value until it gets old, then re-up
            // the cache.
            public long estimate_sum() {
                // For short tables, just do the work
                if (_t.length <= 64) return sum();
                // For bigger tables, periodically freshen a cached value
                long millis = System.currentTimeMillis();
                if (_fuzzy_time != millis) { // Time marches on?
                    _fuzzy_sum_cache = sum(); // Get sum the hard way
                    _fuzzy_time = millis;   // Indicate freshness of cached value
                }
                return _fuzzy_sum_cache;  // Return cached sum
            }

            public String toString() {
                return Long.toString(sum());
            }

            public void print() {
                long[] t = _t;
                System.out.print("[" + t[0]);
                for (int i = 1; i < t.length; i++)
                    System.out.print("," + t[i]);
                System.out.print("]");
                if (_next != null) _next.print();
            }
        }
    }
}
